This time you'll find yourself delving into the heart (and other intestines) of recurrent neural networks on a class of toy problems.
Struggle to find a name for the variable? Let's see how you'll come up with a name for your son/daughter. Surely no human has expertize over what is a good child name, so let us train RNN instead;
It's dangerous to go alone, take these:
In [ ]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
In [ ]:
import os
start_token = " "
with open("names") as f:
lines = f.read()[:-1].split('\n')
lines = [start_token + name for name in lines]
In [ ]:
print('n samples = ', len(lines))
for x in lines[::1000]:
print(x)
In [ ]:
MAX_LENGTH = max(map(len, lines))
print("max length =", MAX_LENGTH)
plt.title('Sequence length distribution')
plt.hist(list(map(len, lines)), bins=25)
In [ ]:
# get all unique characters from lines (including capital letters and symbols)
tokens = <YOUR CODE>
tokens = list(tokens)
n_tokens = len(tokens)
print('n_tokens = ', n_tokens)
assert 50 < n_tokens < 60
In [ ]:
# dictionary of symbol -> its identifier (index in tokens list)
token_to_id = <YOUR CODE>
In [ ]:
assert len(tokens) == len(token_to_id), "dictionaries must have same size"
for i in range(n_tokens):
assert token_to_id[tokens[i]
] == i, "token identifier must be it's position in tokens list"
print("Seems alright!")
In [ ]:
def to_matrix(names, max_len=None, pad=token_to_id[' '], dtype='int32'):
"""Casts a list of names into rnn-digestable matrix"""
max_len = max_len or max(map(len, names))
names_ix = np.zeros([len(names), max_len], dtype) + pad
for i in range(len(names)):
name_ix = list(map(token_to_id.get, names[i]))
names_ix[i, :len(name_ix)] = name_ix
return names_ix
In [ ]:
# Example: cast 4 random names to matrices, pad with zeros
print('\n'.join(lines[::2000]))
print(to_matrix(lines[::2000]))
We can rewrite recurrent neural network as a consecutive application of dense layer to input $x_t$ and previous rnn state $h_t$. This is exactly what we're gonna do now.
Since we're training a language model, there should also be:
In [ ]:
import tensorflow as tf
import keras
import keras.layers as L
emb_size, rnn_size = 16, 64
# an embedding layer that converts character ids into embeddings
embed_x = L.Embedding(n_tokens, emb_size)
# a dense layer that maps input and previous state to new hidden state, [x_t,h_t] -> h_t+1
get_h_next = L.Dense(rnn_size, activation='tanh')
# a dense layer that maps current hidden state to probabilities of characters [h_t+1]->P(x_t+1|h_t+1)
get_probas = L.Dense(n_tokens, activation='softmax')
In [ ]:
def rnn_one_step(x_t, h_t):
"""
Recurrent neural network step that produces next state and output
given prev input and previous state.
We'll call this method repeatedly to produce the whole sequence.
:param x_t: token vector, int32[batch_size,]
:param h_t: previous state matrix, float32[batch_size, rnn_size]
Follow isntructions to complete the function.
"""
# 1. convert character id into embedding (use embed_x layer)
x_t_emb = embed_x(tf.reshape(x_t, [-1, 1]))[:, 0]
# 2. concatenate x _embedding_ and previous h state (over last axis)
<YOUR CODE>
# 3. compute next state given h and x embedding
<YOUR CODE>
# 4. get probabilities for language model P(x_next | h_next)
<YOUR CODE>
return next_h, next_probas
In [ ]:
input_sequence = tf.placeholder('int32', (None, MAX_LENGTH))
batch_size = tf.shape(input_sequence)[0]
# initial hidden state
h0 = tf.zeros([batch_size, rnn_size])
In [ ]:
# TEST: single rnn step
h1, p_y1 = rnn_one_step(input_sequence[:, 0], h0)
dummy_data = np.arange(MAX_LENGTH * 2).reshape([2, -1])
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
test_h1, test_p_y1 = sess.run([h1, p_y1], {input_sequence: dummy_data})
assert test_h1.shape == (len(dummy_data), rnn_size)
assert test_p_y1.shape == (
len(dummy_data), n_tokens) and np.allclose(test_p_y1.sum(-1), 1)
In [ ]:
h_prev = h0
predicted_probas = []
for t in range(MAX_LENGTH):
x_t = input_sequence[:, t]
# Task: compute next token probas and next hidden state
probas_next, h_next = <YOUR CODE>
# END OF YOUR CODE
predicted_probas.append(probas_next)
h_prev = h_next
predicted_probas = tf.stack(predicted_probas, axis=1)
In [ ]:
assert predicted_probas.shape.as_list() == [None, MAX_LENGTH, n_tokens]
assert h_prev.shape.as_list() == h0.shape.as_list()
In [ ]:
predictions_matrix = predicted_probas[:, :-1]
answers_matrix = tf.one_hot(input_sequence[:, 1:], n_tokens)
print('predictions_matrix:', predictions_matrix.shape)
print('answers_matrix:', predictions_matrix.shape)
In [ ]:
# define loss as categorical crossentropy. Mind that predictions are probabilities and NOT logits!
loss = <YOUR_CODE>
optimize = tf.train.AdamOptimizer().minimize(loss)
In [ ]:
from IPython.display import clear_output
from random import sample
sess.run(tf.global_variables_initializer())
history = []
In [ ]:
for i in range(1000):
batch = to_matrix(sample(lines, 32), max_len=MAX_LENGTH)
loss_i, _ = sess.run([loss, optimize], {input_sequence: batch})
history.append(loss_i)
if (i+1) % 100 == 0:
clear_output(True)
plt.plot(history, label='loss')
plt.legend()
plt.show()
assert np.mean(history[:10]) > np.mean(history[-10:]), "RNN didn't converge."
In [ ]:
x_t = tf.placeholder('int32', (None,))
h_t = tf.Variable(np.zeros([1, rnn_size], 'float32'))
next_h, next_probs = rnn_one_step(x_t, h_t)
In [ ]:
def generate_sample(seed_phrase=' ', max_length=MAX_LENGTH):
'''
The function generates text given a phrase of length at least SEQ_LENGTH.
:param seed_phrase: prefix characters. The RNN is asked to continue the phrase
:param max_length: maximum output length, including seed_phrase
:param temperature: coefficient for sampling. higher temperature produces more chaotic outputs,
smaller temperature converges to the single most likely output
'''
x_sequence = [token_to_id[token] for token in seed_phrase]
sess.run(tf.variables_initializer([h_t]))
# feed the seed phrase, if any
for ix in x_sequence[:-1]:
sess.run(tf.assign(h_t, next_h), {x_t: [ix]})
# start generating
for _ in range(max_length-len(seed_phrase)):
x_probs, _ = sess.run([next_probs, tf.assign(h_t, next_h)], {
x_t: [x_sequence[-1]]})
x_sequence.append(np.random.choice(n_tokens, p=x_probs[0]))
return ''.join([tokens[ix] for ix in x_sequence])
In [ ]:
for _ in range(10):
print(generate_sample())
In [ ]:
for _ in range(50):
print(generate_sample(' Trump'))
You've just implemented a recurrent language model that can be tasked with generating any kind of sequence, so there's plenty of data you can try it on:
If you're willing to give it a try, here's what you wanna look at:
Selenium
or Scrapy
for that.Good hunting!
Apart from keras, there's also a friendly tensorflow API for recurrent neural nets. It's based around the symbolic loop function (aka scan).
This interface allows for dynamic sequence length and comes with some pre-implemented architectures.
In [ ]:
class CustomRNN(tf.nn.rnn_cell.BasicRNNCell):
def call(self, input, state):
return rnn_one_step(input[:, 0], state)
@property
def output_size(self):
return n_tokens
cell = CustomRNN(rnn_num_units)
input_sequence = tf.placeholder('int32', (None, None))
predicted_probas, last_state = tf.nn.dynamic_rnn(cell, input_sequence[:, :, None],
time_major=False, dtype='float32')
print predicted_probas.eval({input_sequence: to_matrix(names[:10], max_len=50)}).shape
Note that we never used MAX_LENGTH in the code above: TF will iterate over however many time-steps you gave it.
You can also use the all the pre-implemented RNN cells:
In [ ]:
for obj in dir(tf.nn.rnn_cell)+dir(tf.contrib.rnn):
if obj.endswith('Cell'):
print(obj)
In [ ]:
input_sequence = tf.placeholder('int32', (None, None))
inputs_embedded = embed_x(input_sequence)
cell = tf.nn.rnn_cell.LSTMCell(rnn_num_units)
state_sequence, last_state = tf.nn.dynamic_rnn(
cell, inputs_embedded, dtype='float32')
print('LSTM visible states[batch, time, unit]:', state_sequence)